package com.atguigu.flink.sql.connector;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Created by Smexy on 2023/11/20
 */
public class Demo3_ReadKafka
{
    public static void main(String[] args) {

        //读取外部系统直接获取一个Table，无需流的环境。直接定义表的环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        /*
            定义建表语句
                表中除了数据中的列，还可以读取元数据作为列。
                不同的连接器提供了不同的元数据。
                元数据的列，需要添加关键字 METADATA
         */
        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING," +
            "  ts BIGINT," +
            "  vc INT ," +
            " `timestamp` TIMESTAMP_LTZ(3) METADATA VIRTUAL," +
            " `topic` STRING METADATA VIRTUAL," +
            "  `partition` BIGINT METADATA VIRTUAL," +
            "  `offset` BIGINT METADATA VIRTUAL " +
            ")  WITH (" +
            " 'connector' = 'kafka'," +
            "  'topic' = 't3'," +
            "  'properties.bootstrap.servers' = 'hadoop102:9092'," +
            "  'properties.group.id' = 'testGroup'," +
            "  'scan.startup.mode' = 'earliest-offset'," +
            "  'format' = 'json' " +
            ")";

        tableEnv.executeSql(createTableSql);

        //执行查询
        tableEnv.sqlQuery("select * from t1")
            .execute()
            .print();


    }
}
